home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-9.10-netbook-remix-PL.iso / casper / filesystem.squashfs / usr / lib / python2.6 / multiprocessing / pool.py < prev    next >
Text File  |  2009-11-02  |  18KB  |  597 lines

  1. #
  2. # Module providing the `Pool` class for managing a process pool
  3. #
  4. # multiprocessing/pool.py
  5. #
  6. # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
  7. #
  8.  
  9. __all__ = ['Pool']
  10.  
  11. #
  12. # Imports
  13. #
  14.  
  15. import threading
  16. import Queue
  17. import itertools
  18. import collections
  19. import time
  20.  
  21. from multiprocessing import Process, cpu_count, TimeoutError
  22. from multiprocessing.util import Finalize, debug
  23.  
  24. #
  25. # Constants representing the state of a pool
  26. #
  27.  
  28. RUN = 0
  29. CLOSE = 1
  30. TERMINATE = 2
  31.  
  32. #
  33. # Miscellaneous
  34. #
  35.  
  36. job_counter = itertools.count()
  37.  
  38. def mapstar(args):
  39.     return map(*args)
  40.  
  41. #
  42. # Code run by worker processes
  43. #
  44.  
  45. def worker(inqueue, outqueue, initializer=None, initargs=()):
  46.     put = outqueue.put
  47.     get = inqueue.get
  48.     if hasattr(inqueue, '_writer'):
  49.         inqueue._writer.close()
  50.         outqueue._reader.close()
  51.  
  52.     if initializer is not None:
  53.         initializer(*initargs)
  54.  
  55.     while 1:
  56.         try:
  57.             task = get()
  58.         except (EOFError, IOError):
  59.             debug('worker got EOFError or IOError -- exiting')
  60.             break
  61.  
  62.         if task is None:
  63.             debug('worker got sentinel -- exiting')
  64.             break
  65.  
  66.         job, i, func, args, kwds = task
  67.         try:
  68.             result = (True, func(*args, **kwds))
  69.         except Exception, e:
  70.             result = (False, e)
  71.         put((job, i, result))
  72.  
  73. #
  74. # Class representing a process pool
  75. #
  76.  
  77. class Pool(object):
  78.     '''
  79.     Class which supports an async version of the `apply()` builtin
  80.     '''
  81.     Process = Process
  82.  
  83.     def __init__(self, processes=None, initializer=None, initargs=()):
  84.         self._setup_queues()
  85.         self._taskqueue = Queue.Queue()
  86.         self._cache = {}
  87.         self._state = RUN
  88.  
  89.         if processes is None:
  90.             try:
  91.                 processes = cpu_count()
  92.             except NotImplementedError:
  93.                 processes = 1
  94.  
  95.         self._pool = []
  96.         for i in range(processes):
  97.             w = self.Process(
  98.                 target=worker,
  99.                 args=(self._inqueue, self._outqueue, initializer, initargs)
  100.                 )
  101.             self._pool.append(w)
  102.             w.name = w.name.replace('Process', 'PoolWorker')
  103.             w.daemon = True
  104.             w.start()
  105.  
  106.         self._task_handler = threading.Thread(
  107.             target=Pool._handle_tasks,
  108.             args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
  109.             )
  110.         self._task_handler.daemon = True
  111.         self._task_handler._state = RUN
  112.         self._task_handler.start()
  113.  
  114.         self._result_handler = threading.Thread(
  115.             target=Pool._handle_results,
  116.             args=(self._outqueue, self._quick_get, self._cache)
  117.             )
  118.         self._result_handler.daemon = True
  119.         self._result_handler._state = RUN
  120.         self._result_handler.start()
  121.  
  122.         self._terminate = Finalize(
  123.             self, self._terminate_pool,
  124.             args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
  125.                   self._task_handler, self._result_handler, self._cache),
  126.             exitpriority=15
  127.             )
  128.  
  129.     def _setup_queues(self):
  130.         from .queues import SimpleQueue
  131.         self._inqueue = SimpleQueue()
  132.         self._outqueue = SimpleQueue()
  133.         self._quick_put = self._inqueue._writer.send
  134.         self._quick_get = self._outqueue._reader.recv
  135.  
  136.     def apply(self, func, args=(), kwds={}):
  137.         '''
  138.         Equivalent of `apply()` builtin
  139.         '''
  140.         assert self._state == RUN
  141.         return self.apply_async(func, args, kwds).get()
  142.  
  143.     def map(self, func, iterable, chunksize=None):
  144.         '''
  145.         Equivalent of `map()` builtin
  146.         '''
  147.         assert self._state == RUN
  148.         return self.map_async(func, iterable, chunksize).get()
  149.  
  150.     def imap(self, func, iterable, chunksize=1):
  151.         '''
  152.         Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
  153.         '''
  154.         assert self._state == RUN
  155.         if chunksize == 1:
  156.             result = IMapIterator(self._cache)
  157.             self._taskqueue.put((((result._job, i, func, (x,), {})
  158.                          for i, x in enumerate(iterable)), result._set_length))
  159.             return result
  160.         else:
  161.             assert chunksize > 1
  162.             task_batches = Pool._get_tasks(func, iterable, chunksize)
  163.             result = IMapIterator(self._cache)
  164.             self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  165.                      for i, x in enumerate(task_batches)), result._set_length))
  166.             return (item for chunk in result for item in chunk)
  167.  
  168.     def imap_unordered(self, func, iterable, chunksize=1):
  169.         '''
  170.         Like `imap()` method but ordering of results is arbitrary
  171.         '''
  172.         assert self._state == RUN
  173.         if chunksize == 1:
  174.             result = IMapUnorderedIterator(self._cache)
  175.             self._taskqueue.put((((result._job, i, func, (x,), {})
  176.                          for i, x in enumerate(iterable)), result._set_length))
  177.             return result
  178.         else:
  179.             assert chunksize > 1
  180.             task_batches = Pool._get_tasks(func, iterable, chunksize)
  181.             result = IMapUnorderedIterator(self._cache)
  182.             self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  183.                      for i, x in enumerate(task_batches)), result._set_length))
  184.             return (item for chunk in result for item in chunk)
  185.  
  186.     def apply_async(self, func, args=(), kwds={}, callback=None):
  187.         '''
  188.         Asynchronous equivalent of `apply()` builtin
  189.         '''
  190.         assert self._state == RUN
  191.         result = ApplyResult(self._cache, callback)
  192.         self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
  193.         return result
  194.  
  195.     def map_async(self, func, iterable, chunksize=None, callback=None):
  196.         '''
  197.         Asynchronous equivalent of `map()` builtin
  198.         '''
  199.         assert self._state == RUN
  200.         if not hasattr(iterable, '__len__'):
  201.             iterable = list(iterable)
  202.  
  203.         if chunksize is None:
  204.             chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
  205.             if extra:
  206.                 chunksize += 1
  207.  
  208.         task_batches = Pool._get_tasks(func, iterable, chunksize)
  209.         result = MapResult(self._cache, chunksize, len(iterable), callback)
  210.         self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  211.                               for i, x in enumerate(task_batches)), None))
  212.         return result
  213.  
  214.     @staticmethod
  215.     def _handle_tasks(taskqueue, put, outqueue, pool):
  216.         thread = threading.current_thread()
  217.  
  218.         for taskseq, set_length in iter(taskqueue.get, None):
  219.             i = -1
  220.             for i, task in enumerate(taskseq):
  221.                 if thread._state:
  222.                     debug('task handler found thread._state != RUN')
  223.                     break
  224.                 try:
  225.                     put(task)
  226.                 except IOError:
  227.                     debug('could not put task on queue')
  228.                     break
  229.             else:
  230.                 if set_length:
  231.                     debug('doing set_length()')
  232.                     set_length(i+1)
  233.                 continue
  234.             break
  235.         else:
  236.             debug('task handler got sentinel')
  237.  
  238.  
  239.         try:
  240.             # tell result handler to finish when cache is empty
  241.             debug('task handler sending sentinel to result handler')
  242.             outqueue.put(None)
  243.  
  244.             # tell workers there is no more work
  245.             debug('task handler sending sentinel to workers')
  246.             for p in pool:
  247.                 put(None)
  248.         except IOError:
  249.             debug('task handler got IOError when sending sentinels')
  250.  
  251.         debug('task handler exiting')
  252.  
  253.     @staticmethod
  254.     def _handle_results(outqueue, get, cache):
  255.         thread = threading.current_thread()
  256.  
  257.         while 1:
  258.             try:
  259.                 task = get()
  260.             except (IOError, EOFError):
  261.                 debug('result handler got EOFError/IOError -- exiting')
  262.                 return
  263.  
  264.             if thread._state:
  265.                 assert thread._state == TERMINATE
  266.                 debug('result handler found thread._state=TERMINATE')
  267.                 break
  268.  
  269.             if task is None:
  270.                 debug('result handler got sentinel')
  271.                 break
  272.  
  273.             job, i, obj = task
  274.             try:
  275.                 cache[job]._set(i, obj)
  276.             except KeyError:
  277.                 pass
  278.  
  279.         while cache and thread._state != TERMINATE:
  280.             try:
  281.                 task = get()
  282.             except (IOError, EOFError):
  283.                 debug('result handler got EOFError/IOError -- exiting')
  284.                 return
  285.  
  286.             if task is None:
  287.                 debug('result handler ignoring extra sentinel')
  288.                 continue
  289.             job, i, obj = task
  290.             try:
  291.                 cache[job]._set(i, obj)
  292.             except KeyError:
  293.                 pass
  294.  
  295.         if hasattr(outqueue, '_reader'):
  296.             debug('ensuring that outqueue is not full')
  297.             # If we don't make room available in outqueue then
  298.             # attempts to add the sentinel (None) to outqueue may
  299.             # block.  There is guaranteed to be no more than 2 sentinels.
  300.             try:
  301.                 for i in range(10):
  302.                     if not outqueue._reader.poll():
  303.                         break
  304.                     get()
  305.             except (IOError, EOFError):
  306.                 pass
  307.  
  308.         debug('result handler exiting: len(cache)=%s, thread._state=%s',
  309.               len(cache), thread._state)
  310.  
  311.     @staticmethod
  312.     def _get_tasks(func, it, size):
  313.         it = iter(it)
  314.         while 1:
  315.             x = tuple(itertools.islice(it, size))
  316.             if not x:
  317.                 return
  318.             yield (func, x)
  319.  
  320.     def __reduce__(self):
  321.         raise NotImplementedError(
  322.               'pool objects cannot be passed between processes or pickled'
  323.               )
  324.  
  325.     def close(self):
  326.         debug('closing pool')
  327.         if self._state == RUN:
  328.             self._state = CLOSE
  329.             self._taskqueue.put(None)
  330.  
  331.     def terminate(self):
  332.         debug('terminating pool')
  333.         self._state = TERMINATE
  334.         self._terminate()
  335.  
  336.     def join(self):
  337.         debug('joining pool')
  338.         assert self._state in (CLOSE, TERMINATE)
  339.         self._task_handler.join()
  340.         self._result_handler.join()
  341.         for p in self._pool:
  342.             p.join()
  343.  
  344.     @staticmethod
  345.     def _help_stuff_finish(inqueue, task_handler, size):
  346.         # task_handler may be blocked trying to put items on inqueue
  347.         debug('removing tasks from inqueue until task handler finished')
  348.         inqueue._rlock.acquire()
  349.         while task_handler.is_alive() and inqueue._reader.poll():
  350.             inqueue._reader.recv()
  351.             time.sleep(0)
  352.  
  353.     @classmethod
  354.     def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
  355.                         task_handler, result_handler, cache):
  356.         # this is guaranteed to only be called once
  357.         debug('finalizing pool')
  358.  
  359.         task_handler._state = TERMINATE
  360.         taskqueue.put(None)                 # sentinel
  361.  
  362.         debug('helping task handler/workers to finish')
  363.         cls._help_stuff_finish(inqueue, task_handler, len(pool))
  364.  
  365.         assert result_handler.is_alive() or len(cache) == 0
  366.  
  367.         result_handler._state = TERMINATE
  368.         outqueue.put(None)                  # sentinel
  369.  
  370.         if pool and hasattr(pool[0], 'terminate'):
  371.             debug('terminating workers')
  372.             for p in pool:
  373.                 p.terminate()
  374.  
  375.         debug('joining task handler')
  376.         task_handler.join(1e100)
  377.  
  378.         debug('joining result handler')
  379.         result_handler.join(1e100)
  380.  
  381.         if pool and hasattr(pool[0], 'terminate'):
  382.             debug('joining pool workers')
  383.             for p in pool:
  384.                 p.join()
  385.  
  386. #
  387. # Class whose instances are returned by `Pool.apply_async()`
  388. #
  389.  
  390. class ApplyResult(object):
  391.  
  392.     def __init__(self, cache, callback):
  393.         self._cond = threading.Condition(threading.Lock())
  394.         self._job = job_counter.next()
  395.         self._cache = cache
  396.         self._ready = False
  397.         self._callback = callback
  398.         cache[self._job] = self
  399.  
  400.     def ready(self):
  401.         return self._ready
  402.  
  403.     def successful(self):
  404.         assert self._ready
  405.         return self._success
  406.  
  407.     def wait(self, timeout=None):
  408.         self._cond.acquire()
  409.         try:
  410.             if not self._ready:
  411.                 self._cond.wait(timeout)
  412.         finally:
  413.             self._cond.release()
  414.  
  415.     def get(self, timeout=None):
  416.         self.wait(timeout)
  417.         if not self._ready:
  418.             raise TimeoutError
  419.         if self._success:
  420.             return self._value
  421.         else:
  422.             raise self._value
  423.  
  424.     def _set(self, i, obj):
  425.         self._success, self._value = obj
  426.         if self._callback and self._success:
  427.             self._callback(self._value)
  428.         self._cond.acquire()
  429.         try:
  430.             self._ready = True
  431.             self._cond.notify()
  432.         finally:
  433.             self._cond.release()
  434.         del self._cache[self._job]
  435.  
  436. #
  437. # Class whose instances are returned by `Pool.map_async()`
  438. #
  439.  
  440. class MapResult(ApplyResult):
  441.  
  442.     def __init__(self, cache, chunksize, length, callback):
  443.         ApplyResult.__init__(self, cache, callback)
  444.         self._success = True
  445.         self._value = [None] * length
  446.         self._chunksize = chunksize
  447.         if chunksize <= 0:
  448.             self._number_left = 0
  449.             self._ready = True
  450.         else:
  451.             self._number_left = length//chunksize + bool(length % chunksize)
  452.  
  453.     def _set(self, i, success_result):
  454.         success, result = success_result
  455.         if success:
  456.             self._value[i*self._chunksize:(i+1)*self._chunksize] = result
  457.             self._number_left -= 1
  458.             if self._number_left == 0:
  459.                 if self._callback:
  460.                     self._callback(self._value)
  461.                 del self._cache[self._job]
  462.                 self._cond.acquire()
  463.                 try:
  464.                     self._ready = True
  465.                     self._cond.notify()
  466.                 finally:
  467.                     self._cond.release()
  468.  
  469.         else:
  470.             self._success = False
  471.             self._value = result
  472.             del self._cache[self._job]
  473.             self._cond.acquire()
  474.             try:
  475.                 self._ready = True
  476.                 self._cond.notify()
  477.             finally:
  478.                 self._cond.release()
  479.  
  480. #
  481. # Class whose instances are returned by `Pool.imap()`
  482. #
  483.  
  484. class IMapIterator(object):
  485.  
  486.     def __init__(self, cache):
  487.         self._cond = threading.Condition(threading.Lock())
  488.         self._job = job_counter.next()
  489.         self._cache = cache
  490.         self._items = collections.deque()
  491.         self._index = 0
  492.         self._length = None
  493.         self._unsorted = {}
  494.         cache[self._job] = self
  495.  
  496.     def __iter__(self):
  497.         return self
  498.  
  499.     def next(self, timeout=None):
  500.         self._cond.acquire()
  501.         try:
  502.             try:
  503.                 item = self._items.popleft()
  504.             except IndexError:
  505.                 if self._index == self._length:
  506.                     raise StopIteration
  507.                 self._cond.wait(timeout)
  508.                 try:
  509.                     item = self._items.popleft()
  510.                 except IndexError:
  511.                     if self._index == self._length:
  512.                         raise StopIteration
  513.                     raise TimeoutError
  514.         finally:
  515.             self._cond.release()
  516.  
  517.         success, value = item
  518.         if success:
  519.             return value
  520.         raise value
  521.  
  522.     __next__ = next                    # XXX
  523.  
  524.     def _set(self, i, obj):
  525.         self._cond.acquire()
  526.         try:
  527.             if self._index == i:
  528.                 self._items.append(obj)
  529.                 self._index += 1
  530.                 while self._index in self._unsorted:
  531.                     obj = self._unsorted.pop(self._index)
  532.                     self._items.append(obj)
  533.                     self._index += 1
  534.                 self._cond.notify()
  535.             else:
  536.                 self._unsorted[i] = obj
  537.  
  538.             if self._index == self._length:
  539.                 del self._cache[self._job]
  540.         finally:
  541.             self._cond.release()
  542.  
  543.     def _set_length(self, length):
  544.         self._cond.acquire()
  545.         try:
  546.             self._length = length
  547.             if self._index == self._length:
  548.                 self._cond.notify()
  549.                 del self._cache[self._job]
  550.         finally:
  551.             self._cond.release()
  552.  
  553. #
  554. # Class whose instances are returned by `Pool.imap_unordered()`
  555. #
  556.  
  557. class IMapUnorderedIterator(IMapIterator):
  558.  
  559.     def _set(self, i, obj):
  560.         self._cond.acquire()
  561.         try:
  562.             self._items.append(obj)
  563.             self._index += 1
  564.             self._cond.notify()
  565.             if self._index == self._length:
  566.                 del self._cache[self._job]
  567.         finally:
  568.             self._cond.release()
  569.  
  570. #
  571. #
  572. #
  573.  
  574. class ThreadPool(Pool):
  575.  
  576.     from .dummy import Process
  577.  
  578.     def __init__(self, processes=None, initializer=None, initargs=()):
  579.         Pool.__init__(self, processes, initializer, initargs)
  580.  
  581.     def _setup_queues(self):
  582.         self._inqueue = Queue.Queue()
  583.         self._outqueue = Queue.Queue()
  584.         self._quick_put = self._inqueue.put
  585.         self._quick_get = self._outqueue.get
  586.  
  587.     @staticmethod
  588.     def _help_stuff_finish(inqueue, task_handler, size):
  589.         # put sentinels at head of inqueue to make workers finish
  590.         inqueue.not_empty.acquire()
  591.         try:
  592.             inqueue.queue.clear()
  593.             inqueue.queue.extend([None] * size)
  594.             inqueue.not_empty.notify_all()
  595.         finally:
  596.             inqueue.not_empty.release()
  597.